/*
 * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
 * <p>
 * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.gnu.org/licenses/lgpl.html
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.piggy.mq.vender.rabbitmq;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.piggy.mq.constant.MQVender;
import com.piggy.mq.enums.MQSendTypeEnum;
import com.piggy.mq.model.AbstractMQ;
import com.piggy.mq.scan.ScanMQBeanPackage;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

/**
 * RabbitMQ的配置项
 * 1. 注册全部定义好的Queue Bean
 * 2. 动态注册fanout交换机
 * 3. 将Queue模式绑定到延时消息的交换机
 *
 * @author terrfly
 * @site https://www.jeequan.com
 * @date 2021/7/23 16:33
 */
@Component
@ConditionalOnProperty(name = MQVender.YML_VENDER_KEY, havingValue = MQVender.RABBIT_MQ)
public class RabbitMQConfig {

    /**
     * 全局定义延迟交换机名称
     **/
    public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";

    /**
     * 注入延迟交换机Bean
     **/
    @Resource
    @Qualifier(DELAYED_EXCHANGE_NAME)
    private CustomExchange delayedExchange;

    /**
     * 注入rabbitMQBeanProcessor
     **/
    @Resource
    private RabbitMQBeanProcessor rabbitMQBeanProcessor;

    /**
     * 在全部bean注册完成后再执行
     **/
    @PostConstruct
    public void init() {
        String[] packages = ScanMQBeanPackage.scanPackage();
        // 获取到所有的MQ定义
        Set<Class<?>> set = new LinkedHashSet<>();
        for(String pkg : packages) {
            Set<Class<?>> clsset = ClassUtil.scanPackageBySuper(pkg, AbstractMQ.class);
            if (CollUtil.isNotEmpty(clsset)) {
                set.addAll(clsset);
            }
        }

        for (Class<?> aClass : set) {

            // 实例化
            AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass);

            if (StrUtil.isNotBlank(amq.getMQName())) {
                // 注册Queue === new Queue(name)，  queue名称/bean名称 = mqName
                rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(),
                        BeanDefinitionBuilder.rootBeanDefinition(Queue.class, ()-> {
                            if (MapUtil.isEmpty(amq.getMQArgs())) {
                                return QueueBuilder.durable(amq.getMQName()).build();
                            }
                            Map<String, Object> args = new HashMap<>();
                            if (amq.getMQType().equals(MQSendTypeEnum.DELAY)) {
                                //设置死信交换机
                                args.put("x-dead-letter-exchange", DELAYED_EXCHANGE_NAME);
                                args.put("x-message-ttl", amq.getMQArgs().get("ttl"));
                            }
                            return QueueBuilder.durable(amq.getMQName()).withArguments(args).build();
                        }).getBeanDefinition()
                );
            }

            // 广播模式
            if (amq.getMQType() == MQSendTypeEnum.BROADCAST) {
                if (StrUtil.isAllNotBlank(amq.getExchangeName(), amq.getExchangeType())) {
                    // 动态注册交换机， 交换机名称/bean名称 =  FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName()
                    rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getExchangeName(),
                            BeanDefinitionBuilder.genericBeanDefinition(CustomExchange.class, () -> {
                                        // 普通Exchange 交换机
                                        return new CustomExchange(amq.getExchangeName(), amq.getExchangeType(), true, false);
                                    }

                            ).getBeanDefinition()
                    );

                }

                if (StrUtil.isAllNotBlank(amq.getMQName(), amq.getExchangeName(), amq.routingKey())) {
                    // 延迟交换机与Queue进行绑定， 绑定Bean名称 = mq-exchange-bind
                    rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(String.format("%s-%s-%s", amq.getMQName(), amq.getExchangeName(), "Bind"),
                            BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () ->
                                    BindingBuilder.bind(SpringUtil.getBean(amq.getMQName(), Queue.class)).to(SpringUtil.getBean(amq.getExchangeName(), CustomExchange.class)).with(amq.routingKey()).noargs()
                            ).getBeanDefinition()
                    );
                }

            } else {
                if (StrUtil.isNotBlank(amq.getMQName())) {
                    // 延迟交换机与Queue进行绑定， 绑定Bean名称 = mqName_DelayedBind
                    rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(String.format("%s-%s", amq.getMQName(), "DelayedBind"),
                            BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () ->
                                    BindingBuilder.bind(SpringUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs()

                            ).getBeanDefinition()
                    );
                }
            }
        }
    }

}
